use crate::call::{CallRequestBuilder, RequestMethod};
use crate::defines::{
    OtaVersion, ServerAddress, DEVICE_SERVICE, DOMAIN, OTA_SERVICE, SEND_BUFFER_SIZE,
};
use crate::errors::{log_err, map_rmpv_write_err};
use crate::handler::{Handler, PendingRequest, RemoteRequest, Response};
use crate::handler::{Message, MessageHandler};
use crate::infor::SysInfo;
use crate::model::{Alarm, CallArg, Event, NamedValue, Prop};
use crate::protocols::{create_writer, Transport, TransportType, TransportWrite};
use crate::util::{copy_slice, from_slice_with_null_to_string};
use crate::{PersisParams, Result};
use alloc::borrow::ToOwned;
use alloc::boxed::Box;
use alloc::collections::LinkedList;
use alloc::string::String;
use alloc::vec::Vec;
use alloc::{format, vec};
use core::ops::Deref;
use embed_std::sync::arc::Arc;
use embed_std::sync::atomic::Ordering;
use embed_std::sync::channel::{sync_channel, Receiver, SyncSender};
use embed_std::sync::mutex::Mutex;
use embed_std::thread::sleep_ms;
use embed_std::time::now;
use embed_std::Error;
use md5_rs::INPUT_BUFFER_LEN;
use mqtt::{LwtOption, MqttClient, MqttConnectOption};
use rmpv::encode::{write_value, write_value_ref};
use rmpv::{Integer, Utf8String, Utf8StringRef, Value, ValueRef};

const REQ_TIMEOUT: u32 = 10000;
const REQ_QUEUE_SIZE: usize = 5;

const STATUS_IDLE: usize = 0;
const STATUS_RUNNING: usize = 1;
const STATUS_QUITING: usize = 2;

pub struct Context(Arc<ContextInner>);
impl Context {
    pub fn new(
        handler: Box<dyn Handler>,
        factory: Option<fn() -> Box<dyn MqttClient>>,
        info_fetcher: Option<Box<dyn Transport>>,
    ) -> Result<Self> {
        #[cfg(feature = "paho_mqtt")]
        let mut inner = ContextInner::new(handler, Some(mqtt::create_mqtt), info_fetcher)?;
        #[cfg(not(feature = "paho_mqtt"))]
        let mut inner = ContextInner::new(handler, factory, info_fetcher)?;
        Self::do_init(inner)
    }

    pub fn do_init(mut inner: ContextInner) -> Result<Self> {
        let (tx, rx) = sync_channel(REQ_QUEUE_SIZE);
        let handler = Box::new(MessageHandler::new(tx));
        {
            let mut client = inner.client.lock().unwrap();
            client.init(Some(handler))?;
        }
        let my = Self(Arc::new(inner));
        let inner = my.0.clone();
        embed_std::thread::Builder::new()
            .name("mqtt_pull".into())
            // .stack_size(16 * 1024)
            .stack_size(32 * 1024)
            .spawn(move || {
                infoln!("context thread running...");
                loop {
                    let status = inner.status.load(Ordering::SeqCst);
                    if status == STATUS_RUNNING {
                        let mut has_err = false;
                        {
                            let mut client = inner.client.lock().unwrap();
                            let _ = client.pull_data(0).map_err(|err| {
                                errorln!("handle data failed: {:?}", err);
                                has_err = true;
                            });
                        }
                        if !has_err {
                            let _ = inner.handle_message(&rx, 10).map_err(|err| {
                                errorln!("handle data failed: {:?}", err);
                                has_err = true;
                            });
                        }
                        if !has_err {
                            continue;
                        }
                    } else if status == STATUS_QUITING {
                        break;
                    }
                    sleep_ms(1000);
                }
                infoln!("context thread quit!");
            });
        Ok(my)
    }
}

impl Clone for Context {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl Deref for Context {
    type Target = Arc<ContextInner>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

pub struct ContextInner {
    client: Mutex<Box<dyn MqttClient>>,
    handler: Mutex<Box<dyn Handler>>,
    info_fetcher: Mutex<Option<Box<dyn Transport>>>,
    trans_type: TransportType,
    pending_reqs: Mutex<LinkedList<PendingRequest>>,
    seq: Mutex<u16>,
    status: embed_std::sync::atomic::AtomicUsize,
    session_status: embed_std::sync::atomic::AtomicUsize,
}

const SESSION_DISCONNECT: usize = 0;
const SESSION_CONNECTED: usize = 1;

impl ContextInner {
    pub fn new(
        handler: Box<dyn Handler>,
        factory: Option<fn() -> Box<dyn MqttClient>>,
        fetcher: Option<Box<dyn Transport>>,
    ) -> Result<Self> {
        if factory.is_none() {
            return Err(Error::Other("mqtt creator not set!".to_owned()));
        }
        let mut my = ContextInner {
            client: Mutex::new(factory.unwrap()()),
            handler: Mutex::new(handler),
            info_fetcher: Mutex::new(fetcher),
            trans_type: TransportType::Msgpack,
            pending_reqs: Mutex::new(LinkedList::new()),
            seq: Mutex::new(0),
            status: embed_std::sync::atomic::AtomicUsize::new(STATUS_IDLE),
            session_status: embed_std::sync::atomic::AtomicUsize::new(SESSION_DISCONNECT),
        };
        Ok(my)
    }

    pub fn version() -> &'static str {
        env!("CARGO_PKG_VERSION")
    }

    pub fn stop(&self) {
        self.status.store(STATUS_QUITING, Ordering::SeqCst);
    }

    pub fn connect(&self, mqtt_server: &str, mqtt_port: u16) -> Result<()> {
        self.disconnect();

        infoln!("start rust mqtt client...");
        let param = PersisParams::get();
        let client_id = format!("{}@{}", param.device_uid_str(), param.product_code_str());
        let pwd = self.make_mqtt_connect_passwd();
        let user = client_id.as_str();

        let lwt_topic = format!(
            "v1/dev/{}/{}/model/reports/events",
            param.product_code_str(),
            param.device_uid_str()
        );
        let lwt_payload = self.make_lwt("0.0.0.0", user)?;
        let lwt = LwtOption {
            topic: lwt_topic.as_str(),
            payload: lwt_payload.as_slice(),
            qos: 1,
            retain: false,
        };
        let opts = MqttConnectOption {
            ver: 3,
            client_id: client_id.as_str(),
            user,
            passwd: pwd.as_str(),
            keep_alive_interval: 20,
            command_timeout: 5000,
            lwt: Some(lwt),
        };
        infoln!("create MessageHandler");
        let mut client = self.client.lock().unwrap();
        let _ = client.connect(mqtt_server, mqtt_port, &opts)?;
        let product_code = param.product_code_str();
        let device_uid = param.device_uid_str();
        for topic in vec![
            format!("v1/dev/{}/{}/+/r/+", product_code, device_uid),
            format!("v1/dev/{}/{}/+/+/r/+", product_code, device_uid),
            format!("v1/f/{}/{}/t/+/+/a/+", product_code, device_uid),
            format!("v1/f/+/+/t/{}/{}/+/+/r/+", product_code, device_uid),
        ]
        .iter()
        {
            client.subscribe(topic.as_str(), 2)?;
        }
        infoln!("connected and subscribed");
        self.status.store(STATUS_RUNNING, Ordering::SeqCst);
        self.session_status
            .store(SESSION_CONNECTED, Ordering::SeqCst);
        Ok(())
    }

    fn make_lwt(&self, ip: &str, user: &str) -> Result<Vec<u8>> {
        let data = format!(
            "{{ \"status\": 0, \"ip\" : \"{}\", \"username\": \"{}\" }}",
            ip, user
        );
        let data_len = data.len();
        let mut con_v: Vec<(Value, Value)> = Vec::new();
        con_v.push((
            Value::String(Utf8String::from("v")),
            Value::String(Utf8String::from(data.as_str())),
        ));
        con_v.push((
            Value::String(Utf8String::from("vt")),
            Value::Integer(Integer::from(13)),
        ));
        con_v.push((
            Value::String(Utf8String::from("vl")),
            Value::Integer(Integer::from(data_len)),
        ));

        let mut p: Vec<(Value, Value)> = Vec::new();
        p.push((
            Value::String(Utf8String::from("connection")),
            Value::Map(con_v),
        ));
        p.push((
            Value::String(Utf8String::from("t")),
            Value::Integer(Integer::from(0)),
        ));
        let mut pb = create_writer(256, self.trans_type);
        let writer = pb.payload();
        write_value(writer, &Value::Map(p)).map_err(map_rmpv_write_err)?;
        pb.flush()
    }

    pub fn is_connected(&self) -> bool {
        let mut client = self.client.lock().unwrap();
        if !client.is_connected() {
            self.session_status
                .store(SESSION_DISCONNECT, Ordering::SeqCst);
        }
        self.session_status.load(Ordering::SeqCst) == SESSION_CONNECTED
    }
    pub fn disconnect(&self) {
        let mut client = self.client.lock().unwrap();
        let _ = client.disconnect();
        self.session_status
            .store(SESSION_DISCONNECT, Ordering::SeqCst);
    }

    fn pull_response(&self, seq: u16, timeout: u32) -> Option<Response> {
        const STEP: u32 = 10;
        for i in 0..(timeout / STEP + 1) {
            {
                let mut reqs = self.pending_reqs.lock().unwrap();
                let mut index = 0;
                for v in reqs.iter() {
                    if v.seq == seq {
                        if v.resp.is_some() {
                            // debugln!("seq {} found at slot {}", seq, index);
                            let mut data = reqs.remove(index);
                            return data.resp.take();
                        }
                        break;
                    }
                    index += 1;
                }
            }
            sleep_ms(STEP);
        }
        None
    }

    fn clean_requests(&self) {
        let mut reqs = self.pending_reqs.lock().unwrap();
        let cur = now().as_millis() as u32;
        while reqs.len() > 0 {
            if reqs.front().as_ref().unwrap().created_time + REQ_TIMEOUT < cur {
                infoln!("slot {} timeout", 0);
                reqs.remove(0);
                continue;
            }
            break;
        }
    }

    pub fn report_props(&self, _props: &Vec<Prop>) -> Result<()> {
        Ok(())
    }

    pub fn report_info(&self, _info: &SysInfo) -> Result<()> {
        let mut pb = create_writer(SEND_BUFFER_SIZE, self.trans_type);
        let data = {
            let fetcher = self.info_fetcher.lock().unwrap();
            match *fetcher {
                Some(ref f) => {
                    f.serialize(self.trans_type, &mut pb)?;
                    pb.flush()?
                }
                None => {
                    let inf = SysInfo::get();
                    inf.serialize(self.trans_type, &mut pb)?;
                    pb.flush()?
                }
            }
        };
        let param = PersisParams::get();
        let topic = format!(
            "v1/dev/{}/{}/info",
            param.product_code_str(),
            param.device_uid_str()
        );
        debugln!("report info topic {} data len {}", topic, data.len());
        let mut client = self.client.lock().unwrap();
        client.publish(topic.as_str(), data.as_slice(), 2, false)
    }

    pub fn report_event(&self, _event: &Event) -> Result<()> {
        Ok(())
    }

    pub fn report_alarm(&self, _alarm: &Alarm) -> Result<()> {
        Ok(())
    }

    // @return activated token
    pub fn activate(&self) -> Result<String> {
        let mut req = CallRequestBuilder::new();
        let param = PersisParams::get();
        req.url = format!("/v1/devices/{}/activate_secret", param.device_uid_str());
        req.method = RequestMethod::Post;
        let resp = self.call(DOMAIN, DEVICE_SERVICE, req)?;
        if resp.rc != 0 {
            errorln!("activiate return error: {} {}", resp.rc, resp.rd);
            return Err(Error::Other(format!("{}:{}", resp.rc, resp.rd)));
        }
        if resp.body.is_none() || !resp.body.as_ref().unwrap().is_map() {
            return Err(Error::Other(String::from(
                "get activate response invalid data",
            )));
        }
        let root = resp.body.unwrap();
        let mut secret: Option<String> = None;
        for (key, val) in root.as_map().unwrap() {
            if !key.is_str() {
                continue;
            }
            match key.as_str().unwrap() {
                "secret" => {
                    if val.is_str() {
                        secret = Some(String::from(val.as_str().unwrap()));
                    }
                }
                other => {}
            }
        }
        if let Some(v) = secret.take() {
            Ok(v)
        } else {
            Err(Error::Other(String::from("no secret field")))
        }
    }

    pub fn bind(&self, _user_id: &str) -> Result<()> {
        unimplemented!()
    }

    pub fn get_server_list(&self) -> Result<Vec<ServerAddress>> {
        unimplemented!()
    }

    pub fn get_ota_version(&self) -> Result<OtaVersion> {
        let mut req = CallRequestBuilder::new();
        let cfg = PersisParams::get();
        req.url = format!(
            "/v1/ota/products/{}/platforms/{}/version?device_uid={}&ver={}",
            cfg.product_code_str(),
            "default",
            cfg.device_uid_str(),
            SysInfo::get().ver.sys,
        );
        req.method = RequestMethod::Get;
        let resp = self.call(DOMAIN, OTA_SERVICE, req)?;
        let mut ver = OtaVersion {
            version: String::from(""),
            force_upgrade: false,
            release_notes: String::from(""),
            download_url: String::from(""),
            check_sum: String::from(""),
        };
        self.check_resp(&resp)?;
        if resp.body.is_none() || !resp.body.as_ref().unwrap().is_map() {
            return Err(Error::Other(String::from(
                "get ota version response invalid data",
            )));
        }
        let root = resp.body.unwrap();
        for (key, val) in root.as_map().unwrap() {
            if !key.is_str() {
                continue;
            }
            match key.as_str().unwrap() {
                "last_version" => {
                    if val.is_str() {
                        ver.version = String::from(val.as_str().unwrap());
                    }
                }
                "force_upgrade" => {
                    if val.is_bool() {
                        ver.force_upgrade = val.as_bool().unwrap();
                    }
                }
                "download_url" => {
                    if val.is_str() {
                        ver.download_url = String::from(val.as_str().unwrap());
                    }
                }
                "check_sum" => {
                    if val.is_str() {
                        ver.check_sum = String::from(val.as_str().unwrap());
                    }
                }
                "release_notes" => {
                    if val.is_str() {
                        ver.release_notes = String::from(val.as_str().unwrap());
                    }
                }
                other => {}
            }
        }
        Ok(ver)
    }

    fn check_resp(&self, resp: &Response) -> Result<()> {
        if resp.rc != 0 {
            return Err(Error::Io(resp.rc, resp.rd.clone()));
        }
        Ok(())
    }
    pub fn call(
        &self,
        target_domain: &str,
        target_id: &str,
        req: CallRequestBuilder,
    ) -> Result<Response> {
        let mut pb = create_writer(SEND_BUFFER_SIZE, self.trans_type);
        let buff = pb.payload();
        req.build(buff)?;
        let seq = self.gen_seq();
        let product_code = PersisParams::get().product_code_str();
        let device_uid = PersisParams::get().device_uid_str();
        let topic = format!(
            "v1/f/{}/{}/t/{}/{}/r/{}",
            product_code, device_uid, target_domain, target_id, seq
        );
        debugln!("call topic {} datalen {}", topic, buff.len());
        {
            let req = PendingRequest {
                seq,
                created_time: now().as_millis() as u32,
                resp: None,
            };
            let mut reqs = self.pending_reqs.lock().unwrap();
            infoln!("add pending request seq {}", seq);
            reqs.push_back(req);
        }

        {
            let mut client = self.client.lock().unwrap();
            infoln!("publish {}", topic);
            client.publish(topic.as_str(), buff.as_slice(), 2, false)?;
        }
        if let Some(resp) = self.pull_response(seq, REQ_TIMEOUT) {
            debugln!("got response {}", resp.topic);
            Ok(resp)
        } else {
            warnln!("call {} timeout", topic);
            Err(Error::Timeout)
        }
    }

    fn handle_message(&self, rx: &Receiver<Message>, to: u32) -> Result<usize> {
        self.clean_requests();
        let size = match rx.recv(to) {
            Ok(mut msg) => match msg {
                Message::Request(mut req) => {
                    debugln!(
                        "got remote request {} payload len {}",
                        req.topic,
                        req.payload.len()
                    );
                    match rmpv::decode::read_value(&mut req.payload.as_slice()) {
                        Ok(root) => {
                            let items: Vec<&str> = req.topic.split("/").collect();
                            match items[items.len() - 3] {
                                "props" => {
                                    let _ = self.handle_set_prop(req.topic, root).map_err(log_err);
                                }
                                "method" => {
                                    let _ =
                                        self.handle_method_call(req.topic, root).map_err(log_err);
                                }
                                others => {
                                    errorln!("unknown request topic {}", req.topic);
                                }
                            }
                            1
                        }
                        Err(e) => {
                            errorln!("remote request {} invalid payload", req.topic);
                            0
                        }
                    }
                }
                Message::Response(seq, resp) => {
                    debugln!("got response {}", seq);
                    let mut reqs = self.pending_reqs.lock().unwrap();
                    debugln!("seek pending reqs {}", reqs.len());
                    for v in reqs.iter_mut() {
                        debugln!("v seq {}", v.seq);
                        if v.seq == seq {
                            v.resp = Some(resp);
                            debugln!("set response {}", seq);
                            break;
                        }
                    }
                    1
                }
            },
            Err(_) => 0,
        };
        Ok(size)
    }

    fn handle_set_prop(&self, topic: String, root: Value) -> Result<()> {
        if !root.is_map() {
            return Err(Error::Other(String::from("invalid prop body")));
        }
        let mut props = Vec::new();
        for (key, val) in root.as_map().unwrap() {
            if !key.is_str() || !val.is_array() {
                continue;
            }
            let mut prop = Prop {
                name: String::from(key.as_str().unwrap()),
                features: Vec::new(),
            };
            for fval in val.as_array().unwrap() {
                let fv = NamedValue::parse(fval)?;
                prop.features.push(fv);
            }
            props.push(prop);
        }
        let ret_props = {
            let mut handler = self.handler.lock().unwrap();
            handler.on_set_props(props)?
        };
        let mut rbody = Vec::new();
        rbody.push((
            ValueRef::String(Utf8StringRef::from("rc")),
            ValueRef::Integer(0i32.into()),
        ));
        rbody.push((
            ValueRef::String(Utf8StringRef::from("rd")),
            ValueRef::String(Utf8StringRef::from("ok")),
        ));
        rbody.push((
            ValueRef::String(Utf8StringRef::from("data")),
            Self::from_props_to_rvalue_ref(&ret_props),
        ));

        self.send_rvalue(topic.as_str(), &ValueRef::Map(rbody))
    }

    fn handle_method_call(&self, topic: String, root: Value) -> Result<()> {
        if !root.is_map() {
            return Err(Error::Other(String::from("invalid prop body")));
        }
        let mut method_name: Option<&str> = None;
        let mut args = Vec::new();
        for (key, val) in root.as_map().unwrap() {
            if !key.is_str() {
                continue;
            }
            match key.as_str().unwrap() {
                "name" => {
                    if !val.is_str() {
                        return Err(Error::Other(String::from("name field not str")));
                    }
                    method_name = Some(val.as_str().unwrap());
                }
                "args" => {
                    if val.is_array() {
                        for v in val.as_array().unwrap() {
                            let arg = CallArg::parse(v)?;
                            args.push(arg);
                        }
                    }
                }
                _others => {
                    continue;
                }
            }
        }
        if method_name.is_none() {
            return Err(Error::Other(String::from("not valid name filed")));
        }
        let ret_props = {
            let mut handler = self.handler.lock().unwrap();
            handler.on_call_method(method_name.unwrap(), args)?
        };
        let mut rbody = Vec::new();
        rbody.push((
            ValueRef::String(Utf8StringRef::from("rc")),
            ValueRef::Integer(0i32.into()),
        ));
        rbody.push((
            ValueRef::String(Utf8StringRef::from("rd")),
            ValueRef::String(Utf8StringRef::from("ok")),
        ));
        let mut ps = Vec::new();
        for prop in ret_props.iter() {
            let mut features = Vec::new();
            for f in prop.features.iter() {
                features.push(f.into_rvalue_ref());
            }
            ps.push(ValueRef::Map(vec![(
                ValueRef::String(Utf8StringRef::from(prop.name.as_str())),
                ValueRef::Array(features),
            )]));
        }
        let mut returns = Vec::new();
        returns.push((
            ValueRef::String(Utf8StringRef::from("return")),
            ValueRef::Array(ps),
        ));
        rbody.push((
            ValueRef::String(Utf8StringRef::from("data")),
            ValueRef::Map(returns),
        ));
        self.send_rvalue(topic.as_str(), &ValueRef::Map(rbody))
    }

    fn from_props_to_rvalue_ref(props: &Vec<Prop>) -> ValueRef {
        let mut ps = Vec::new();
        for prop in props.iter() {
            let mut features = Vec::new();
            for f in prop.features.iter() {
                features.push(f.into_rvalue_ref());
            }
            ps.push((
                ValueRef::String(Utf8StringRef::from(prop.name.as_str())),
                ValueRef::Array(features),
            ));
        }
        ValueRef::Map(ps)
    }

    fn send_rvalue(&self, topic: &str, rbody: &ValueRef) -> Result<()> {
        let mut pb = create_writer(SEND_BUFFER_SIZE, self.trans_type);
        write_value_ref(pb.payload(), &rbody).map_err(map_rmpv_write_err)?;
        let data = pb.flush()?;
        let top = topic.replace("/r/", "/a/");
        let mut client = self.client.lock().unwrap();
        client.publish(top.as_str(), data.as_slice(), 2, false)
    }

    fn make_mqtt_connect_passwd(&self) -> String {
        let n = now().as_secs();
        let param = PersisParams::get();
        let secret = if param.activate_secret_str().is_empty() {
            param.secret_str()
        } else {
            param.activate_secret_str()
        };
        let plain_text = format!(
            "{}:{}:{}:{}",
            param.product_code_str(),
            param.device_uid_str(),
            secret,
            n
        );
        let mut ctx = md5_rs::Context::new();
        let mut pos = 0usize;
        while pos < plain_text.len() {
            let len = if pos + INPUT_BUFFER_LEN > plain_text.len() {
                plain_text.len() - pos
            } else {
                INPUT_BUFFER_LEN
            };
            ctx.read(&plain_text.as_bytes()[pos..(pos + len)]);
            pos += len;
        }
        let digest = ctx.finish();
        let mut data = String::new();
        data.push_str("0$");
        for i in 0..16 {
            data.push_str(format!("{:02x}", digest[i]).as_str());
        }
        data.push_str(format!("${}", n).as_str());
        data
    }

    fn gen_seq(&self) -> u16 {
        let mut seq = self.seq.lock().unwrap();
        if *seq == u16::MAX {
            *seq = 1;
        } else {
            *seq += 1;
        }
        let s = *seq;
        s
    }
}
